1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.junit.Assert.fail;
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.*;
21
22 import java.util.Arrays;
23
24 import org.junit.*;
25
26 import rx.*;
27 import rx.Observable.OnSubscribe;
28 import rx.functions.Func1;
29 import rx.observers.TestSubscriber;
30 import rx.subjects.*;
31
32 public class OperatorTakeWhileTest {
33
34 @Test
35 public void testTakeWhile1() {
36 Observable<Integer> w = Observable.just(1, 2, 3);
37 Observable<Integer> take = w.takeWhile(new Func1<Integer, Boolean>() {
38 @Override
39 public Boolean call(Integer input) {
40 return input < 3;
41 }
42 });
43
44 @SuppressWarnings("unchecked")
45 Observer<Integer> observer = mock(Observer.class);
46 take.subscribe(observer);
47 verify(observer, times(1)).onNext(1);
48 verify(observer, times(1)).onNext(2);
49 verify(observer, never()).onNext(3);
50 verify(observer, never()).onError(any(Throwable.class));
51 verify(observer, times(1)).onCompleted();
52 }
53
54 @Test
55 public void testTakeWhileOnSubject1() {
56 Subject<Integer, Integer> s = PublishSubject.create();
57 Observable<Integer> take = s.takeWhile(new Func1<Integer, Boolean>() {
58 @Override
59 public Boolean call(Integer input) {
60 return input < 3;
61 }
62 });
63
64 @SuppressWarnings("unchecked")
65 Observer<Integer> observer = mock(Observer.class);
66 take.subscribe(observer);
67
68 s.onNext(1);
69 s.onNext(2);
70 s.onNext(3);
71 s.onNext(4);
72 s.onNext(5);
73 s.onCompleted();
74
75 verify(observer, times(1)).onNext(1);
76 verify(observer, times(1)).onNext(2);
77 verify(observer, never()).onNext(3);
78 verify(observer, never()).onNext(4);
79 verify(observer, never()).onNext(5);
80 verify(observer, never()).onError(any(Throwable.class));
81 verify(observer, times(1)).onCompleted();
82 }
83
84 @Test
85 public void testTakeWhile2() {
86 Observable<String> w = Observable.just("one", "two", "three");
87 Observable<String> take = w.takeWhile(new Func1<String, Boolean>() {
88 int index = 0;
89
90 @Override
91 public Boolean call(String input) {
92 return index++ < 2;
93 }
94 });
95
96 @SuppressWarnings("unchecked")
97 Observer<String> observer = mock(Observer.class);
98 take.subscribe(observer);
99 verify(observer, times(1)).onNext("one");
100 verify(observer, times(1)).onNext("two");
101 verify(observer, never()).onNext("three");
102 verify(observer, never()).onError(any(Throwable.class));
103 verify(observer, times(1)).onCompleted();
104 }
105
106 @Test
107 public void testTakeWhileDoesntLeakErrors() {
108 Observable<String> source = Observable.create(new OnSubscribe<String>() {
109 @Override
110 public void call(Subscriber<? super String> observer) {
111 observer.onNext("one");
112 observer.onError(new Throwable("test failed"));
113 }
114 });
115
116 source.takeWhile(new Func1<String, Boolean>() {
117 @Override
118 public Boolean call(String s) {
119 return false;
120 }
121 }).toBlocking().lastOrDefault("");
122 }
123
124 @Test
125 public void testTakeWhileProtectsPredicateCall() {
126 TestObservable source = new TestObservable(mock(Subscription.class), "one");
127 final RuntimeException testException = new RuntimeException("test exception");
128
129 @SuppressWarnings("unchecked")
130 Observer<String> observer = mock(Observer.class);
131 Observable<String> take = Observable.create(source).takeWhile(new Func1<String, Boolean>() {
132 @Override
133 public Boolean call(String s) {
134 throw testException;
135 }
136 });
137 take.subscribe(observer);
138
139
140 try {
141 source.t.join();
142 } catch (Throwable e) {
143 e.printStackTrace();
144 fail(e.getMessage());
145 }
146
147 verify(observer, never()).onNext(any(String.class));
148 verify(observer, times(1)).onError(testException);
149 }
150
151 @Test
152 public void testUnsubscribeAfterTake() {
153 Subscription s = mock(Subscription.class);
154 TestObservable w = new TestObservable(s, "one", "two", "three");
155
156 @SuppressWarnings("unchecked")
157 Observer<String> observer = mock(Observer.class);
158 Observable<String> take = Observable.create(w).takeWhile(new Func1<String, Boolean>() {
159 int index = 0;
160
161 @Override
162 public Boolean call(String s) {
163 return index++ < 1;
164 }
165 });
166 take.subscribe(observer);
167
168
169 try {
170 w.t.join();
171 } catch (Throwable e) {
172 e.printStackTrace();
173 fail(e.getMessage());
174 }
175
176 System.out.println("TestObservable thread finished");
177 verify(observer, times(1)).onNext("one");
178 verify(observer, never()).onNext("two");
179 verify(observer, never()).onNext("three");
180 verify(s, times(1)).unsubscribe();
181 }
182
183 private static class TestObservable implements Observable.OnSubscribe<String> {
184
185 final Subscription s;
186 final String[] values;
187 Thread t = null;
188
189 public TestObservable(Subscription s, String... values) {
190 this.s = s;
191 this.values = values;
192 }
193
194 @Override
195 public void call(final Subscriber<? super String> observer) {
196 System.out.println("TestObservable subscribed to ...");
197 observer.add(s);
198 t = new Thread(new Runnable() {
199
200 @Override
201 public void run() {
202 try {
203 System.out.println("running TestObservable thread");
204 for (String s : values) {
205 System.out.println("TestObservable onNext: " + s);
206 observer.onNext(s);
207 }
208 observer.onCompleted();
209 } catch (Throwable e) {
210 throw new RuntimeException(e);
211 }
212 }
213
214 });
215 System.out.println("starting TestObservable thread");
216 t.start();
217 System.out.println("done starting TestObservable thread");
218 }
219 }
220
221 @Test
222 public void testBackpressure() {
223 Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
224 @Override
225 public Boolean call(Integer t1) {
226 return t1 < 100;
227 }
228 });
229 TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
230 @Override
231 public void onStart() {
232 request(5);
233 }
234 };
235
236 source.subscribe(ts);
237
238 ts.assertNoErrors();
239 ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
240
241 ts.requestMore(5);
242
243 ts.assertNoErrors();
244 ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
245 }
246
247 @Test
248 public void testNoUnsubscribeDownstream() {
249 Observable<Integer> source = Observable.range(1, 1000).takeWhile(new Func1<Integer, Boolean>() {
250 @Override
251 public Boolean call(Integer t1) {
252 return t1 < 2;
253 }
254 });
255 TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
256
257 source.unsafeSubscribe(ts);
258
259 ts.assertNoErrors();
260 ts.assertReceivedOnNext(Arrays.asList(1));
261
262 Assert.assertFalse("Unsubscribed!", ts.isUnsubscribed());
263 }
264 }